Dataformワークフローをシェルスクリプトでポーリングしながら動かす
やりたいこと
Dataformワークフローを起動した後、実行状態をポーリングして完了したら次のDataformワークフローを動かすということがやりたかったです。
というのも、以前作成したシェルスクリプトはどんどんDataformワークフローを起動していきますが、Dataformワークフロー実行APIは非同期APIとなり実行後は実行状態を返してくるだけで完了したかどうかは実行状態を問い合わせるAPIを打って確認しないとわかりません。
一つのワークフローの実行が完了したら次のワークフローを実行する、ということを自動化したかったのでワークフローの実行状態をポーリングして、ワークフロー実行完了が返ってきたら次のワークフローを実行するというシェルスクリプトを実装しました。前日データ作成完了後に次の日のデータを作成する必要がある、というユースケースの時にこのシェルスクリプトは向いています。
また、開始日、終了日を引数にしていますが配列でコンパイル変数の内容を持たせてそれを反復処理する、なんていうことにも少し手を入れれば使えると思います。
処理の流れ
シェルスクリプト全文は以下になります。
#!/bin/bash # 開始日と終了日を指定(yyyy-mm-dd形式) START_DATE="2023-01-01" END_DATE="2023-02-10" PROJECT_ID="プロジェクトID" REPO_NAME="リポジトリ名" POLLING_INTERVAL=30 # タイムアウトの上限(秒) TIMEOUT_LIMIT=1800 # 日付フォーマット変換関数 format_date() { date -d "$1" +"%Y%m%d" } # 開始日と終了日をyyyymmdd形式に変換 start_date=$(format_date "$START_DATE") end_date=$(format_date "$END_DATE") # 開始日から終了日までの期間をループ current_date="$START_DATE" while [ "$(format_date "$current_date")" -le "$end_date" ]; do # 日付をyyyymmdd形式で出力 yyyymmdd_val=$(format_date "$current_date") yyyymmdd="\'${yyyymmdd_val}\'" echo "Processing date: $yyyymmdd" # Compilation resultの作成リクエスト comp_result_response=$(curl -X POST "https://dataform.googleapis.com/v1beta1/projects/$PROJECT_ID/locations/asia-northeast1/repositories/$REPO_NAME/compilationResults" \ --header "Authorization: Bearer $(gcloud auth print-access-token)" \ --header "Content-Type: application/json" \ --data "{\"gitCommitish\":\"main\", \"codeCompilationConfig\":{\"vars\":{\"compile_val\":\"$yyyymmdd\"}}}") # レスポンスからcompilationResultsのname値を抽出 comp_result_name=$(echo "$comp_result_response" | jq -r '.name') echo "Compilation Result Name: $comp_result_name" # Workflow invocationの作成リクエスト workflow_invocation_response=$(curl -X POST "https://dataform.googleapis.com/v1beta1/projects/$PROJECT_ID/locations/asia-northeast1/repositories/$REPO_NAME/workflowInvocations" \ --header "Authorization: Bearer $(gcloud auth print-access-token)" \ --header "Content-Type: application/json" \ --data "{\"compilationResult\":\"$comp_result_name\"}") workflow_invocation_name=$(echo "$workflow_invocation_response" | jq -r '.name') echo "Workflow Invocation Name: $workflow_invocation_name" # ワークフローの状態をポーリング start_time=$(date +%s) while true; do status_response=$(curl -X GET "https://dataform.googleapis.com/v1beta1/$workflow_invocation_name" \ --header "Authorization: Bearer $(gcloud auth print-access-token)" \ --header "Content-Type: application/json") workflow_state=$(echo "$status_response" | jq -r '.state') echo "Current Workflow State: $workflow_state" if [[ "$workflow_state" == "SUCCEEDED" ]]; then echo "Workflow succeeded." break elif [[ "$workflow_state" == "FAILED" ]]; then echo "Workflow failed." exit 1 else current_time=$(date +%s) elapsed_time=$((current_time - start_time)) if [[ $elapsed_time -ge $TIMEOUT_LIMIT ]]; then echo "Workflow polling timed out after $elapsed_time seconds." exit 1 fi echo "Workflow still in progress. Current state: $workflow_state. Waiting..." sleep $POLLING_INTERVAL fi done current_date=$(date -I -d "$current_date + 1 day") done
ポーリングを行なっている以下の箇所が、このシェルスクリプトの肝になるところです。
while true; do status_response=$(curl -X GET "https://dataform.googleapis.com/v1beta1/$workflow_invocation_name" \ --header "Authorization: Bearer $(gcloud auth print-access-token)" \ --header "Content-Type: application/json") workflow_state=$(echo "$status_response" | jq -r '.state') echo "Current Workflow State: $workflow_state" if [[ "$workflow_state" == "SUCCEEDED" ]]; then echo "Workflow succeeded." break elif [[ "$workflow_state" == "FAILED" ]]; then echo "Workflow failed." exit 1
この処理があるので、実行したDataformワークフローが実行完了したら、次のコンパイル結果を再びDataformワークフローを実行することができます。
Dataformワークフローを実行したレスポンスのname
パラメタを引数に以下のAPIを叩くとDataformワークフローの実行状態(state
)を取得することができます。
https://dataform.googleapis.com/v1beta1/$workflow_invocation_name"
Dataformワークフローの実行状態は以下のstateのどれかになります。
今回の実装では、SUCCEEDED
だったら後続処理実行、FAILED
だったらシェルスクリプト自体を終了するようにしています。
state | 概要 |
---|---|
STATE_UNSPECIFIED | 初期値。使用されることはない |
RUNNING | ワークフローが実行中 |
SUCCEEDED | 成功 |
CANCELLED | キャンセル |
FAILED | 失敗 |
CANCELING | キャンセル中 |
またPOLLING_INTERVAL
変数でポーリング間隔を調整することができます。
Dataformの実行結果がFAILED
とSUCCEEDED
以外になってしまった場合に無限ループしないようにポーリングのタイムアウト時間もTIMEOUT_LIMIT
で設定をしております。適宜ワークロードに則って値を調整すると良いと考えます。
まとめ
前回作成したシェルスクリプトはポーリング処理をしないでとにかく指定日付分Dataformワークフローを実行するというものでした。
DataformをAPIで実行する場合、前後関係がある実行をしたい場合は今回のようなポーリングをしてあげる必要があります。
この記事がどなたかのお役に立てば嬉しいです。それではまた。